package cn.piflow.local

import cn.piflow._
import cn.piflow.util.Logging
import org.apache.spark.sql.SparkSession
import org.quartz.{Job, JobExecutionContext}

import scala.collection.JavaConversions.mapAsScalaMap

/**
	* @author bluejoe2008@gmail.com
	*/
class SparkFlowEngine(sparkSession: SparkSession) extends FlowEngine with Logging {

	val jobService = new FlowJobServiceImpl();

	def getStatService(): StatService = jobService;

	def getJobService(): JobService = jobService;

	def start() = {
		jobService.start();
	}

	def shutdown() = {
		if (jobService.getScheduledJobs().nonEmpty)
			logger.warn("Runner is to be shutdown, while there are running jobs!");

		jobService.shutdown();
		SparkSession.builder.getOrCreate().stop();
	}
}

class FlowGraphJob extends Job with Logging {
	override def execute(ctx: JobExecutionContext) = {
		val map = ctx.getJobDetail.getJobDataMap;
		val flowGraph = map("flowGraph").asInstanceOf[FlowGraph];
		val jobService = map("jobService").asInstanceOf[FlowJobServiceImpl];
		new FlowJobExecutor(jobService).executeFlowGraph(flowGraph, ctx);
	}
}